package com.blackcat.rabbitmq.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 描述 ：配置文件类
 * @author : zhangdahui
 * @date : 2022/8/11 10:04
 *
 */
@Slf4j
@Configuration
public class RabbitConfig {

    // 交换机名
    private static final String EXCHANGE_NAME = "fanout-order-exchange";
    // 队列名
    private static final String SMS_QUEUE = "sms-fanout-queue";
    private static final String EMAIL_QUEUE = "email-fanout-queue";
    private static final String WECHAT_QUEUE = "wechat-fanout-queue";

    // ----------------------------------模拟三个消息队列绑定到一个发布订阅交换机上--------------------------------
    /**
     * 1.
     * 声明交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        /**
         * FanoutExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化  true：持久化，交换机一直保留    false：不持久化，用完就删除
         * 3. 是否自动删除  false：不自动删除    true：自动删除
         */
        return new FanoutExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化   true：持久化    false：不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
//    @Bean
//    public Binding smsBinding() {
//        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
//    }
//
//    @Bean
//    public Binding emailBinding() {
//        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
//    }
//
//    @Bean
//    public Binding wechatBinding() {
//        return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());
//    }
    // ----------------------------------模拟三个消息队列绑定到一个发布订阅交换机上--------------------------------

// ----------------------------------路由模式--------------------------------
    @Bean("directExchange")
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("directExchange").build();
    }

    @Bean
    public Binding smsBinding(@Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder
                .bind(smsQueue())
                .to(exchange)
                .with("sms")
                .noargs();
    }

    @Bean
    public Binding emailBinding(@Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder
                .bind(emailQueue())
                .to(exchange)
                .with("email")
                .noargs();
    }

    @Bean
    public Binding wechatBinding(@Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder
                .bind(wechatQueue())
                .to(exchange)
                .with("wechat")
                .noargs();
    }
// ----------------------------------路由模式--------------------------------

    /**
     * 转换接收对象的数据格式，默认是java序列化的格式
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

//    @Autowired
//    private ConfirmReturn confirmReturn;
//    @Autowired
//    private MessageReturn messageReturn;

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
//        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
//        rabbitTemplate.setReturnCallback(messageReturn);
//        rabbitTemplate.setConfirmCallback(confirmReturn);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("返回消息回调:{} 应答代码:{} 回复文本:{} 交换器:{} 路由键:{}", message, replyCode, replyText, exchange, routingKey);
        });
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("回调id:" + correlationData.getId());
            if (ack) {
                log.info("消息成功发送到exchange:"+correlationData);
            } else {
                log.error("消息发送exchange失败:" + cause);
            }
        });
        return rabbitTemplate;
    }

    @Bean
    public FanoutExchange aaaaExchange() {
        return new FanoutExchange("aaaa", true, false);
    }

    @Bean
    public Queue qqqqq() {
        return new Queue("qqqqq", true);
    }

    @Bean
    public Queue bbbb() {
        return new Queue("bbbb", true);
    }

    @Bean
    public Binding qqqqqBinding() {
        return BindingBuilder.bind(qqqqq()).to(aaaaExchange());
    }

    @Bean
    public Binding bbbbBinding() {
        return BindingBuilder.bind(bbbb()).to(aaaaExchange());
    }
}
